[新機能]クエリサイズを日時で分割しbackfill処理も行いやすいdbt incremental modelの「microbatch」strategyを試してみた
さがらです。
dbt ver1.9の新機能であるmicrobatch incremental modelsを試してみたので、本記事でその内容をまとめてみます。
microbatch incremental modelsとは
まず、incremental modelsについてですが、dbtで前回更新時からの差分更新を行うことができるmodelとなっています。
その上で、このincremental modelsではappend
、merge
、delete+insert
、insert_overwrite
という4つの増分更新方法がこれまで提供されていました。
この増分更新方法の新しいオプションとして追加されたのが、microbatch
となります。
microbatch
での更新時の挙動ですが、指定したカラムの値を元に日別に分けて、複数の更新処理が走るイメージとなります。(下図は上記のリンク先ドキュメントからの引用です。)
また、日別に処理を管理できる仕様のためdbt run・build
のオプションとして下記のように--event-time-start
と--event-time-end
のオプションを設定すると、過去の一区間のデータだけ更新されたときに、その区間だけ差分更新させるということが容易に実現できます。「backfill」と呼ばれる類の処理となります。(下図は上記のリンク先ドキュメントからの引用です。)
これは今までのincremental modelではできなかったことで、ソースデータの誤りを修正した場合やビジネスロジックが変更したときに過去データにも適用させたい場合など、活用場面は多いと思います。
dbt run --event-time-start "2024-09-01" --event-time-end "2024-09-04"
事前準備
データの準備
事前に、以下のクエリを実行し、検証用データを準備しておきます。(DWHはSnowflakeです。クエリはClaude 3.5 Sonnetに作成してもらいました。)
create or replace table ec_sales (
product_id integer,
product_name varchar(100),
quantity integer,
sale_amount decimal(10,2),
sold_at timestamp,
updated_at timestamp default current_timestamp()
);
insert into ec_sales (product_id, product_name, quantity, sale_amount, sold_at, updated_at)
values
(1001, 'スマートフォン', 2, 159980.00, '2024-10-01 10:30:00'::timestamp, '2024-10-01 11:00:00'::timestamp),
(1002, 'ワイヤレスイヤホン', 5, 49950.00, '2024-10-01 14:45:00'::timestamp, '2024-10-01 15:00:00'::timestamp),
(1003, 'ノートパソコン', 1, 129800.00, '2024-10-02 09:15:00'::timestamp, '2024-10-02 09:30:00'::timestamp),
(1004, 'タブレット', 3, 149970.00, '2024-10-02 16:20:00'::timestamp, '2024-10-02 16:45:00'::timestamp),
(1005, 'スマートウォッチ', 4, 79960.00, '2024-10-03 11:00:00'::timestamp, '2024-10-03 11:15:00'::timestamp),
(1006, 'ポータブル充電器', 10, 29900.00, '2024-10-03 15:30:00'::timestamp, '2024-10-03 15:45:00'::timestamp),
(1007, 'ゲーミングマウス', 2, 15980.00, '2024-10-04 13:45:00'::timestamp, '2024-10-04 14:00:00'::timestamp),
(1008, 'メカニカルキーボード', 1, 12980.00, '2024-10-04 17:10:00'::timestamp, '2024-10-04 17:30:00'::timestamp),
(1009, 'ワイヤレススピーカー', 3, 35970.00, '2024-10-05 10:00:00'::timestamp, '2024-10-05 10:15:00'::timestamp),
(1010, 'アクションカメラ', 2, 79980.00, '2024-10-05 14:30:00'::timestamp, '2024-10-05 14:45:00'::timestamp);
select * from ec_sales order by sold_at, product_id;
dbt上の定義
dbt上で「source」と「ステージングレイヤーのModelとyaml」も下記のように定義しておきます。
特に、この後作成するIncremental Modelから参照されるModelに対するyamlでconfig: event_time:
を設定しておくことが非常に重要です。これがないと、microbatchのmodelを実行した際に、適切にWHERE句が入りません。
- sources.yml
version: 2
sources:
- name: microbatch_test
database: SAGARA_RAWDATA_DB
tables:
- name: ec_sales
- stg_ec_sales.sql
select
product_id,
product_name,
quantity,
sale_amount,
sold_at,
updated_at
from
{{ source('microbatch_test','ec_sales') }}
- schema.yml
version: 2
models:
- name: stg_ec_sales
config:
event_time: sold_at
dbt Cloud上でEnviroment Variableを定義 ※Beta期間中のみ
私が試した2024年10月11日時点ではまだBeta版のため、Enviroment Variableを定義してこの機能を有効化しておく必要があります。
具体的には、下図のようにDBT_EXPERIMENTAL_MICROBATCH
をTrue
に設定してください。
microbatch incremental modelsを定義してbuild
dbt上で下記の内容の新しいModelを作成します。
- ec_sales_incremental.sql
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='sold_at',
begin='2024-10-01',
lookback=4,
batch_size='day'
) }}
select
product_id,
product_name,
quantity,
sale_amount,
sold_at,
updated_at
from
{{ ref('stg_ec_sales') }}
config
内の各パラメータはこのような内容となっています。
incremental_strategy
:'microbatch'と記入。event_time
:このconfig
の対象のModelの中で、バッチの粒度を分けたい、DATE型やTIMESTAMP型のカラム名を記入。begin
:このincremental modelを初めてビルドする際、どの日付からのレコードをビルドの対象するか記入。lookback
:dbt run/build
の実行日から、何日分遡ってビルドを行うか記入。(この記事の後半でもこのlookback
の挙動について実例交えて説明しています。)- デフォルトは0で、0にすると
dbt run/build
の実行日しかbuild対象とならない。 - 実行日の起算はUTCベースのため、日本ユーザーの方は日本時間午前9時より前に実行する際はご注意ください。
- デフォルトは0で、0にすると
batch_size
:microbatchのビルド処理の実行粒度を記入。- 2024年10月11日時点では
day
しか選択できない仕様となっているためご注意ください。
- 2024年10月11日時点では
この後、dbt run -s +ec_sales_incremental
を実行して一度テーブルを作成してみます。
すると、下図のように実行ログが残ります。これがmicrobatch特有の仕様で、begin
パラメータで定義した日付からdbt runを実行した日まで、日別にWHERE句が入ったクエリが実行されます。
新しいデータを6日分入れてbuild
この後、下記のSQLをSnowflake上で実行し、6日分の新しいレコードを追加します。
insert into ec_sales (product_id, product_name, quantity, sale_amount, sold_at, updated_at)
values
(1011, '4Kモニター', 2, 89980.00, '2024-10-06 09:15:00'::timestamp, '2024-10-06 09:30:00'::timestamp),
(1012, 'ゲーミングヘッドセット', 3, 44970.00, '2024-10-06 14:20:00'::timestamp, '2024-10-06 14:35:00'::timestamp),
(1013, 'ワイヤレスマウス', 5, 24950.00, '2024-10-07 10:45:00'::timestamp, '2024-10-07 11:00:00'::timestamp),
(1014, '外付けSSD', 2, 39980.00, '2024-10-07 16:30:00'::timestamp, '2024-10-07 16:45:00'::timestamp),
(1015, 'ウェブカメラ', 4, 31960.00, '2024-10-08 11:20:00'::timestamp, '2024-10-08 11:35:00'::timestamp),
(1016, 'ブルートゥーススピーカー', 3, 26970.00, '2024-10-08 15:50:00'::timestamp, '2024-10-08 16:05:00'::timestamp),
(1017, 'ゲーミングチェア', 1, 49990.00, '2024-10-09 10:30:00'::timestamp, '2024-10-09 10:45:00'::timestamp),
(1018, 'メカニカルキーボード', 2, 29980.00, '2024-10-09 15:15:00'::timestamp, '2024-10-09 15:30:00'::timestamp),
(1019, 'ノイズキャンセリングヘッドホン', 3, 89970.00, '2024-10-10 09:45:00'::timestamp, '2024-10-10 10:00:00'::timestamp),
(1020, 'ポータブルSSD', 4, 59960.00, '2024-10-10 14:20:00'::timestamp, '2024-10-10 14:35:00'::timestamp),
(1021, 'スマートホームハブ', 2, 39980.00, '2024-10-11 11:10:00'::timestamp, '2024-10-11 11:25:00'::timestamp),
(1022, 'ドローン', 1, 79990.00, '2024-10-11 16:40:00'::timestamp, '2024-10-11 16:55:00'::timestamp);
この上で、再度dbt run -s +ec_sales_incremental
を実行してみます。
すると、今度は10月7日~10月11日の5日分のデータに対して日別にクエリが実行されました。
これは、sqlファイル上のconfig
でlookback=4
としていたためであり、dbt runの実行日である10月11日から4日分遡って、10月10日、10月9日、10月8日、10月7日、が処理対象となっていたからです。
このため、先ほど6日分INSERTしたうちの1日目である10月6日分のデータは漏れた形でデータが入っています。(下図参照)
この次の章で、10月6日分が漏れたリカバリを行っていきます。
10月6日分のデータをリカバリ(backfill)
10月6日分のデータが漏れた分をリカバリするためには、dbt run -s +ec_sales_incremental --event-time-start "2024-10-06" --event-time-end "2024-10-07"
という形でリカバリしたい期間を指定してコマンドを実行します。
このコマンドの実行により、下図のようにクエリが実行されます。
これで、10月6日分のデータがリカバリができました!
注意点としては、2024年10月11日時点ではTIMESTAMP型のカラムをevent_time
に指定していると、リカバリしたい期間の1日後まで指定しないと正しくWHERE句が指定されません。
参考までに、dbt run -s +ec_sales_incremental --event-time-start "2024-10-06" --event-time-end "2024-10-06"
を実行すると下図のようなWHERE句でクエリが実行されてしまい、全く意味のないクエリとなってしまいます。
最後に
dbt ver1.9の新機能であるmicrobatch incremental modelsを試してみました。
個人的に、backfillが簡単にできることがとても気に入りました!これまでのincremental modelだと過去のある1日だけでもデータの変更があると、full-refleshで再度作り直しを行うか、レコードを直接修正、一時的にクエリの内容を修正、などする必要があり大変だったので…
また、日別に細かいクエリを実行する形となるため、例えばSnowflakeでいうとより小さいウェアハウスサイズにできる可能性もあります。マルチクラスターウェアハウスの恩恵も受けやすくなりそうですね。